Coverage Report

Created: 2026-02-05 09:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
D:\a\scloud-dns\scloud-dns\src\threads\mod.rs
Line
Count
Source
1
use crate::exceptions::SCloudException;
2
use crate::threads::task::ScloudWorkerTask;
3
use crate::{log_debug, utils};
4
use anyhow::Result;
5
use futures_util::StreamExt;
6
use serde::{Deserialize, Serialize};
7
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering};
8
9
pub(crate) mod task;
10
pub(crate) mod tests;
11
12
#[cfg(windows)]
13
mod windows;
14
15
#[cfg(target_os = "linux")]
16
mod linux;
17
18
#[cfg(target_os = "macos")]
19
mod macos;
20
21
#[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
22
mod others;
23
mod queue;
24
mod workers;
25
26
#[cfg(windows)]
27
mod thread {
28
    pub(crate) use crate::threads::windows::imp as thread_base;
29
    pub(crate) use crate::threads::windows::priority::imp as priority;
30
}
31
32
#[cfg(target_os = "linux")]
33
mod thread {
34
    pub(crate) use crate::threads::linux::imp as thread_base;
35
    pub(crate) use crate::threads::linux::priority::imp as priority;
36
}
37
38
#[cfg(target_os = "macos")]
39
mod thread {
40
    pub(crate) use crate::threads::macos::imp as thread_base;
41
    pub(crate) use crate::threads::macos::priority::imp as priority;
42
}
43
44
#[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
45
mod thread {
46
    pub(crate) use crate::threads::others::imp as thread_base;
47
    pub(crate) use crate::threads::others::priority::imp as priority;
48
}
49
50
#[allow(unused)]
51
#[allow(non_camel_case_types)]
52
/// Internal worker descriptor and runtime controls.
53
///
54
/// - Mutable runtime knobs are atomics for lock-free internal control updates.
55
/// - Non-atomic fields are treated as immutable after construction (engine-dev only).
56
///
57
/// Notes:
58
/// - `os_thread_id` is for diagnostics only (`0` = unset/invalid). Do not treat it as a liveness guarantee.
59
/// - `priority`/`priority_scope` store the *desired* policy; internal code must explicitly apply it.
60
/// - On macOS, priority is applied via QoS by default. Enabling the optional `mach-rt` feature may
61
///   apply a true Mach real-time policy for `ThreadPriority::REALTIME` (advanced/potentially disruptive).
62
/// - `stack_size_bytes` is a spawn-time knob on most platforms; updates typically only affect *future spawns*
63
///   (e.g., after respawn/restart), not an already-running thread.
64
///
65
/// Incoming:
66
/// - CPU affinity/processor binding
67
pub(crate) struct ScloudWorker {
68
    // IDENTITY
69
    pub(crate) worker_id: u64,
70
    pub(crate) os_thread_id: AtomicU64,
71
    pub(crate) worker_type: WorkerType,
72
73
    // RESOURCES/LIMITS
74
    pub(crate) stack_size_bytes: AtomicUsize,
75
    pub(crate) buffer_budget_bytes: AtomicUsize,
76
    pub(crate) max_stack_size_bytes: AtomicUsize,
77
    pub(crate) max_buffer_budget_bytes: AtomicUsize,
78
79
    // SCHEDULING/PRIORITY
80
    pub(crate) priority: AtomicU8,
81
    pub(crate) priority_scope: AtomicU8,
82
    last_applied_priority: AtomicU8,
83
    last_applied_scope: AtomicU8,
84
85
    // RUNTIME STATE
86
    pub(crate) state: AtomicU8,
87
    pub(crate) shutdown_requested: AtomicBool,
88
    pub(crate) shutdown_mode: AtomicU8,
89
90
    // BACKPRESSURE/IN-FLIGHT
91
    pub(crate) in_flight: AtomicUsize,     // should be 0/1
92
    pub(crate) max_in_flight: AtomicUsize, // prefetch/internal pool
93
94
    // METRICS
95
    pub(crate) jobs_done: AtomicU64,
96
    pub(crate) jobs_failed: AtomicU64,
97
    pub(crate) jobs_retried: AtomicU64,
98
99
    pub(crate) last_job_started_ms: AtomicU64,
100
    pub(crate) last_job_finished_ms: AtomicU64,
101
102
    pub(crate) last_error_code: AtomicU64,
103
    pub(crate) last_error_at_ms: AtomicU64,
104
105
    // CORRELATION/TRACING
106
    pub(crate) last_task_id_hi: AtomicU64, // 128-bit UUID split
107
    pub(crate) last_task_id_lo: AtomicU64,
108
109
    // BROKER RELATED
110
    pub(crate) consumer_tag_hash: AtomicU64, // find which consumer RabbitMQ (hash)
111
}
112
113
#[allow(unused)]
114
impl ScloudWorker {
115
    const NEVER_APPLIED: u8 = 0xFF;
116
117
0
    pub(crate) fn new(worker_id: u64, worker_type: WorkerType) -> Self {
118
0
        Self {
119
0
            worker_id,
120
0
            os_thread_id: AtomicU64::new(0),
121
0
            worker_type,
122
0
            stack_size_bytes: AtomicUsize::new(2 * 1024 * 1024),
123
0
            buffer_budget_bytes: AtomicUsize::new(4 * 1024 * 1024),
124
0
            max_stack_size_bytes: AtomicUsize::new(32 * 1024 * 1024),
125
0
            max_buffer_budget_bytes: AtomicUsize::new(256 * 1024 * 1024),
126
0
            priority: AtomicU8::new(ThreadPriority::NORMAL as u8),
127
0
            priority_scope: AtomicU8::new(PriorityScope::THREAD as u8),
128
0
            last_applied_priority: AtomicU8::new(Self::NEVER_APPLIED),
129
0
            last_applied_scope: AtomicU8::new(Self::NEVER_APPLIED),
130
0
            state: AtomicU8::new(WorkerState::IDLE as u8),
131
0
            shutdown_requested: AtomicBool::new(false),
132
0
            shutdown_mode: AtomicU8::new(ShutdownMode::GRACEFUL as u8),
133
0
            in_flight: AtomicUsize::new(0),
134
0
            max_in_flight: AtomicUsize::new(1),
135
0
            jobs_done: AtomicU64::new(0),
136
0
            jobs_failed: AtomicU64::new(0),
137
0
            jobs_retried: AtomicU64::new(0),
138
0
            last_job_started_ms: AtomicU64::new(0),
139
0
            last_job_finished_ms: AtomicU64::new(0),
140
0
            last_error_code: AtomicU64::new(0),
141
0
            last_error_at_ms: AtomicU64::new(0),
142
0
            last_task_id_hi: AtomicU64::new(0),
143
0
            last_task_id_lo: AtomicU64::new(0),
144
0
            consumer_tag_hash: AtomicU64::new(0),
145
0
        }
146
0
    }
147
148
0
    pub(crate) async fn run(&self) -> Result<(), SCloudException> {
149
        // TODO: check the type of worker and adapt what is doing
150
0
        match self.worker_type {
151
0
            WorkerType::LISTENER => {
152
0
153
0
            }
154
0
            WorkerType::DECODER => {
155
0
156
0
            }
157
0
            WorkerType::QUERY_DISPATCHER => {
158
0
159
0
            }
160
0
            WorkerType::CACHE_LOOKUP => {
161
0
162
0
            }
163
0
            WorkerType::ZONE_MANAGER => {
164
0
165
0
            }
166
0
            WorkerType::RESOLVER => {
167
0
168
0
            }
169
0
            WorkerType::CACHE_WRITER => {
170
0
171
0
            }
172
0
            WorkerType::ENCODER => {
173
0
174
0
            }
175
0
            WorkerType::SENDER => {
176
0
177
0
            }
178
0
            WorkerType::CACHE_JANITOR => {
179
0
180
0
            }
181
0
            WorkerType::METRICS => {
182
0
183
0
            }
184
0
            WorkerType::TCP_ACCEPTOR => {
185
0
186
0
            }
187
0
            _ => {
188
0
189
0
            }
190
        }
191
0
        Ok(())
192
0
    }
193
194
    #[inline]
195
0
    pub fn get_worker_id(&self) -> u64 {
196
0
        self.worker_id
197
0
    }
198
199
    #[inline]
200
0
    pub fn get_os_thread_id(&self) -> u64 {
201
0
        self.os_thread_id.load(Ordering::Relaxed)
202
0
    }
203
204
    #[inline]
205
0
    pub fn get_worker_type(&self) -> WorkerType {
206
0
        self.worker_type
207
0
    }
208
209
    #[inline]
210
0
    pub fn get_stack_size_bytes(&self) -> usize {
211
0
        self.stack_size_bytes.load(Ordering::Relaxed)
212
0
    }
213
214
    #[inline]
215
0
    pub fn get_buffer_budget_bytes(&self) -> usize {
216
0
        self.buffer_budget_bytes.load(Ordering::Relaxed)
217
0
    }
218
219
    #[inline]
220
0
    pub fn get_max_stack_size_bytes(&self) -> usize {
221
0
        self.max_stack_size_bytes.load(Ordering::Relaxed)
222
0
    }
223
224
    #[inline]
225
0
    pub fn get_max_buffer_budget_bytes(&self) -> usize {
226
0
        self.max_buffer_budget_bytes.load(Ordering::Relaxed)
227
0
    }
228
229
    #[inline]
230
0
    pub fn get_priority(&self) -> u8 {
231
0
        self.priority.load(Ordering::Relaxed)
232
0
    }
233
234
    #[inline]
235
0
    pub fn get_priority_scope(&self) -> u8 {
236
0
        self.priority_scope.load(Ordering::Relaxed)
237
0
    }
238
239
    #[inline]
240
0
    pub fn get_last_applied_priority(&self) -> u8 {
241
0
        self.last_applied_priority.load(Ordering::Relaxed)
242
0
    }
243
244
    #[inline]
245
0
    pub fn get_last_applied_scope(&self) -> u8 {
246
0
        self.last_applied_scope.load(Ordering::Relaxed)
247
0
    }
248
249
    #[inline]
250
0
    pub fn get_state(&self) -> u8 {
251
0
        self.state.load(Ordering::Relaxed)
252
0
    }
253
254
    #[inline]
255
0
    pub fn get_shutdown_requested(&self) -> bool {
256
0
        self.shutdown_requested.load(Ordering::Relaxed)
257
0
    }
258
259
    #[inline]
260
0
    pub fn get_shutdown_mode(&self) -> u8 {
261
0
        self.shutdown_mode.load(Ordering::Relaxed)
262
0
    }
263
264
    #[inline]
265
0
    pub fn get_in_flight(&self) -> usize {
266
0
        self.in_flight.load(Ordering::Relaxed)
267
0
    }
268
269
    #[inline]
270
0
    pub fn get_max_in_flight(&self) -> usize {
271
0
        self.max_in_flight.load(Ordering::Relaxed)
272
0
    }
273
274
    #[inline]
275
0
    pub fn get_jobs_done(&self) -> u64 {
276
0
        self.jobs_done.load(Ordering::Relaxed)
277
0
    }
278
279
    #[inline]
280
0
    pub fn get_jobs_failed(&self) -> u64 {
281
0
        self.jobs_failed.load(Ordering::Relaxed)
282
0
    }
283
284
    #[inline]
285
0
    pub fn get_jobs_retried(&self) -> u64 {
286
0
        self.jobs_retried.load(Ordering::Relaxed)
287
0
    }
288
289
    #[inline]
290
0
    pub fn get_last_job_started_ms(&self) -> u64 {
291
0
        self.last_job_started_ms.load(Ordering::Relaxed)
292
0
    }
293
294
    #[inline]
295
0
    pub fn get_last_job_finished_ms(&self) -> u64 {
296
0
        self.last_job_finished_ms.load(Ordering::Relaxed)
297
0
    }
298
299
    #[inline]
300
0
    pub fn get_last_error_code(&self) -> u64 {
301
0
        self.last_error_code.load(Ordering::Relaxed)
302
0
    }
303
304
    #[inline]
305
0
    pub fn get_last_error_at_ms(&self) -> u64 {
306
0
        self.last_error_at_ms.load(Ordering::Relaxed)
307
0
    }
308
309
    #[inline]
310
0
    pub fn get_last_task_id_hi(&self) -> u64 {
311
0
        self.last_task_id_hi.load(Ordering::Relaxed)
312
0
    }
313
314
    #[inline]
315
0
    pub fn get_last_task_id_lo(&self) -> u64 {
316
0
        self.last_task_id_lo.load(Ordering::Relaxed)
317
0
    }
318
319
    #[inline]
320
0
    pub fn get_consumer_tag_hash(&self) -> u64 {
321
0
        self.consumer_tag_hash.load(Ordering::Relaxed)
322
0
    }
323
324
    #[inline]
325
0
    pub fn set_worker_id(&mut self, worker_id: u64) {
326
0
        self.worker_id = worker_id;
327
0
    }
328
329
    #[inline]
330
0
    pub fn set_os_thread_id(&mut self, os_thread_id: u64) {
331
0
        self.os_thread_id.store(os_thread_id, Ordering::Relaxed)
332
0
    }
333
334
    #[inline]
335
0
    pub fn set_worker_type(&mut self, worker_type: WorkerType) {
336
0
        self.worker_type = worker_type;
337
0
    }
338
339
    #[inline]
340
0
    pub fn set_stack_size_bytes(&mut self, stack_size_bytes: usize) {
341
0
        self.stack_size_bytes
342
0
            .store(stack_size_bytes, Ordering::Relaxed);
343
0
    }
344
345
    #[inline]
346
0
    pub fn set_buffer_budget_bytes(&mut self, buffer_budget_bytes: usize) {
347
0
        self.buffer_budget_bytes
348
0
            .store(buffer_budget_bytes, Ordering::Relaxed);
349
0
    }
350
351
    #[inline]
352
0
    pub fn set_max_stack_size_bytes(&mut self, max_stack_size_bytes: usize) {
353
0
        self.max_stack_size_bytes
354
0
            .store(max_stack_size_bytes, Ordering::Relaxed);
355
0
    }
356
357
    #[inline]
358
0
    pub fn set_max_buffer_budget_bytes(&mut self, max_buffer_budget_bytes: usize) {
359
0
        self.max_buffer_budget_bytes
360
0
            .store(max_buffer_budget_bytes, Ordering::Relaxed);
361
0
    }
362
363
    #[inline]
364
0
    pub fn set_priority(&mut self, priority: u8) {
365
0
        self.priority.store(priority, Ordering::Relaxed);
366
0
    }
367
368
    #[inline]
369
0
    pub fn set_priority_scope(&mut self, priority_scope: u8) {
370
0
        self.priority_scope.store(priority_scope, Ordering::Relaxed);
371
0
    }
372
373
    #[inline]
374
0
    pub fn set_last_applied_priority(&mut self, last_applied_priority: u8) {
375
0
        self.last_applied_priority
376
0
            .store(last_applied_priority, Ordering::Relaxed);
377
0
    }
378
379
    #[inline]
380
0
    pub fn set_last_applied_scope(&mut self, last_applied_scope: u8) {
381
0
        self.last_applied_scope
382
0
            .store(last_applied_scope, Ordering::Relaxed);
383
0
    }
384
385
    #[inline]
386
0
    pub fn set_state(&mut self, state: u8) {
387
0
        self.state.store(state, Ordering::Relaxed);
388
0
    }
389
390
    #[inline]
391
0
    pub fn set_shutdown_requested(&mut self, shutdown_requested: bool) {
392
0
        self.shutdown_requested
393
0
            .store(shutdown_requested, Ordering::Relaxed);
394
0
    }
395
396
    #[inline]
397
0
    pub fn set_shutdown_mode(&mut self, shutdown_mode: u8) {
398
0
        self.shutdown_mode.store(shutdown_mode, Ordering::Relaxed);
399
0
    }
400
401
    #[inline]
402
0
    pub fn set_in_flight(&mut self, in_flight: usize) {
403
0
        self.in_flight.store(in_flight, Ordering::Relaxed);
404
0
    }
405
406
    #[inline]
407
0
    pub fn set_max_in_flight(&mut self, max_in_flight: usize) {
408
0
        self.max_in_flight.store(max_in_flight, Ordering::Relaxed);
409
0
    }
410
411
    #[inline]
412
0
    pub fn set_jobs_done(&mut self, jobs_done: u64) {
413
0
        self.jobs_done.store(jobs_done, Ordering::Relaxed);
414
0
    }
415
416
    #[inline]
417
0
    pub fn set_jobs_failed(&mut self, jobs_failed: u64) {
418
0
        self.jobs_failed.store(jobs_failed, Ordering::Relaxed);
419
0
    }
420
421
    #[inline]
422
0
    pub fn set_jobs_retried(&mut self, jobs_retried: u64) {
423
0
        self.jobs_retried.store(jobs_retried, Ordering::Relaxed);
424
0
    }
425
426
    #[inline]
427
0
    pub fn set_last_job_started_ms(&mut self, last_job_started_ms: u64) {
428
0
        self.last_job_started_ms
429
0
            .store(last_job_started_ms, Ordering::Relaxed);
430
0
    }
431
432
    #[inline]
433
0
    pub fn set_last_job_finished_ms(&mut self, last_job_finished_ms: u64) {
434
0
        self.last_job_finished_ms
435
0
            .store(last_job_finished_ms, Ordering::Relaxed);
436
0
    }
437
438
    #[inline]
439
0
    pub fn set_last_error_code(&mut self, last_error_code: u64) {
440
0
        self.last_error_code
441
0
            .store(last_error_code, Ordering::Relaxed);
442
0
    }
443
444
    #[inline]
445
0
    pub fn set_last_error_at_ms(&mut self, last_error_at_ms: u64) {
446
0
        self.last_error_at_ms
447
0
            .store(last_error_at_ms, Ordering::Relaxed);
448
0
    }
449
450
    #[inline]
451
0
    pub fn set_last_task_id_hi(&mut self, last_task_id_hi: u64) {
452
0
        self.last_task_id_hi
453
0
            .store(last_task_id_hi, Ordering::Relaxed);
454
0
    }
455
456
    #[inline]
457
0
    pub fn set_last_task_id_lo(&mut self, last_task_id_lo: u64) {
458
0
        self.last_task_id_lo
459
0
            .store(last_task_id_lo, Ordering::Relaxed);
460
0
    }
461
462
    #[inline]
463
0
    pub fn set_consumer_tag_hash(&mut self, consumer_tag_hash: u64) {
464
0
        self.consumer_tag_hash
465
0
            .store(consumer_tag_hash, Ordering::Relaxed);
466
0
    }
467
}
468
469
#[allow(unused)]
470
#[allow(non_camel_case_types)]
471
#[repr(u8)]
472
pub(crate) enum WorkerState {
473
    INIT = 0,
474
    IDLE = 1,
475
    BUSY = 2,
476
    PAUSED = 3,
477
    STOPPING = 4,
478
    STOPPED = 5,
479
}
480
481
#[allow(unused)]
482
#[allow(non_camel_case_types)]
483
#[repr(u8)]
484
pub(crate) enum ShutdownMode {
485
    GRACEFUL = 0,
486
    IMMEDIATE = 1,
487
}
488
489
#[allow(unused)]
490
#[allow(non_camel_case_types)]
491
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Eq)]
492
pub enum WorkerType {
493
    LISTENER,
494
    DECODER,
495
    QUERY_DISPATCHER,
496
    CACHE_LOOKUP,
497
    ZONE_MANAGER,
498
    RESOLVER,
499
    CACHE_WRITER,
500
    ENCODER,
501
    SENDER,
502
503
    CACHE_JANITOR,
504
505
    METRICS,
506
    TCP_ACCEPTOR,
507
}
508
509
#[repr(u8)]
510
#[allow(unused)]
511
#[allow(non_camel_case_types)]
512
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
513
pub enum ClassPriority {
514
    IDLE = 0,
515
    BELOW_NORMAL = 1,
516
    NORMAL = 2,
517
    ABOVE_NORMAL = 3,
518
    HIGH = 4,
519
    REALTIME = 5,
520
}
521
522
impl ClassPriority {
523
    #[inline]
524
    #[allow(unused)]
525
0
    pub fn from_u8(v: u8) -> Self {
526
0
        match v {
527
0
            0 => Self::IDLE,
528
0
            1 => Self::BELOW_NORMAL,
529
0
            2 => Self::NORMAL,
530
0
            3 => Self::ABOVE_NORMAL,
531
0
            4 => Self::HIGH,
532
0
            5 => Self::REALTIME,
533
            _ => {
534
0
                debug_assert!(false, "invalid ClassPriority value: {}", v);
535
0
                Self::NORMAL
536
            }
537
        }
538
0
    }
539
540
    #[inline]
541
    #[allow(unused)]
542
0
    pub fn to_unix_nice(self) -> i32 {
543
0
        match self {
544
0
            Self::IDLE => 19,
545
0
            Self::BELOW_NORMAL => 10,
546
0
            Self::NORMAL => 0,
547
0
            Self::ABOVE_NORMAL => -5,
548
0
            Self::HIGH => -10,
549
0
            Self::REALTIME => -20, // Not true RT; this is "strongly favored timesharing" at best.
550
        }
551
0
    }
552
}
553
554
#[repr(u8)]
555
#[allow(unused)]
556
#[allow(non_camel_case_types)]
557
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
558
pub enum ThreadPriority {
559
    IDLE = 0,
560
    LOW = 1,
561
    BELOW_NORMAL = 2,
562
    NORMAL = 3,
563
    ABOVE_NORMAL = 4,
564
    HIGH = 5,
565
    REALTIME = 6,
566
}
567
568
impl ThreadPriority {
569
    #[inline]
570
    #[allow(unused)]
571
0
    fn from_u8(v: u8) -> Self {
572
0
        match v {
573
0
            0 => Self::IDLE,
574
0
            1 => Self::LOW,
575
0
            2 => Self::BELOW_NORMAL,
576
0
            3 => Self::NORMAL,
577
0
            4 => Self::ABOVE_NORMAL,
578
0
            5 => Self::HIGH,
579
0
            6 => Self::REALTIME,
580
            _ => {
581
0
                debug_assert!(false, "invalid ThreadPriority value: {}", v);
582
0
                Self::NORMAL
583
            }
584
        }
585
0
    }
586
}
587
588
#[repr(u8)]
589
#[allow(unused)]
590
#[allow(non_camel_case_types)]
591
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
592
pub enum PriorityScope {
593
    THREAD = 0,
594
    PROCESS = 1,
595
    USER = 2,
596
    PROCESS_GROUP = 3,
597
}
598
599
impl PriorityScope {
600
    #[inline]
601
    #[allow(unused)]
602
0
    fn from_u8(v: u8) -> Self {
603
0
        match v {
604
0
            0 => Self::THREAD,
605
0
            1 => Self::PROCESS,
606
0
            2 => Self::USER,
607
0
            3 => Self::PROCESS_GROUP,
608
            _ => {
609
0
                debug_assert!(false, "invalid PriorityScope value: {}", v);
610
0
                Self::THREAD
611
            }
612
        }
613
0
    }
614
}
615
616
pub struct SpawnConfig<'a> {
617
    pub name: Option<&'a str>,
618
    pub stack_size: Option<usize>,
619
}
620
621
impl<'a> Default for SpawnConfig<'a> {
622
0
    fn default() -> Self {
623
0
        Self {
624
0
            name: None,
625
0
            stack_size: None,
626
0
        }
627
0
    }
628
}
629
630
#[allow(unused)]
631
0
pub fn new<F, T>(cfg: SpawnConfig<'_>, f: F) -> std::thread::JoinHandle<T>
632
0
where
633
0
    F: FnOnce() -> T + Send + 'static,
634
0
    T: Send + 'static,
635
{
636
0
    thread::thread_base::new(cfg, f)
637
0
}
638
639
// TODO: should return an ScloudException
640
#[allow(unused)]
641
0
pub fn set_priority(scope: PriorityScope, p: ThreadPriority) -> std::io::Result<()> {
642
0
    thread::priority::set_priority(scope, p)
643
0
}